package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"net/http"
	"net/url"
	"os"
	"os/signal"
	"strings"
	"syscall"
	"time"

	"github.com/coreos/go-systemd/v22/daemon"
	"github.com/fatih/color"
	"github.com/influxdata/tail/watch"
	"gopkg.in/tomb.v1"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/agent"
	"github.com/influxdata/telegraf/config"
	"github.com/influxdata/telegraf/internal"
	"github.com/influxdata/telegraf/logger"
	"github.com/influxdata/telegraf/plugins/aggregators"
	"github.com/influxdata/telegraf/plugins/inputs"
	"github.com/influxdata/telegraf/plugins/outputs"
	"github.com/influxdata/telegraf/plugins/parsers"
	"github.com/influxdata/telegraf/plugins/processors"
	"github.com/influxdata/telegraf/plugins/secretstores"
)

var stop chan struct{}

type GlobalFlags struct {
	config                  []string
	configDir               []string
	testWait                int
	configURLRetryAttempts  int
	configURLWatchInterval  time.Duration
	watchConfig             string
	watchInterval           time.Duration
	pidFile                 string
	plugindDir              string
	password                string
	oldEnvBehavior          bool
	printPluginConfigSource bool
	test                    bool
	debug                   bool
	once                    bool
	quiet                   bool
	unprotected             bool
}

type WindowFlags struct {
	service             string
	serviceName         string
	serviceDisplayName  string
	serviceRestartDelay string
	serviceAutoRestart  bool
	console             bool
}

type App interface {
	Init(<-chan error, Filters, GlobalFlags, WindowFlags)
	Run() error

	// Secret store commands
	ListSecretStores() ([]string, error)
	GetSecretStore(string) (telegraf.SecretStore, error)
}

type Telegraf struct {
	pprofErr <-chan error

	inputFilters       []string
	outputFilters      []string
	configFiles        []string
	secretstoreFilters []string

	cfg *config.Config

	GlobalFlags
	WindowFlags
}

func (t *Telegraf) Init(pprofErr <-chan error, f Filters, g GlobalFlags, w WindowFlags) {
	t.pprofErr = pprofErr
	t.inputFilters = f.input
	t.outputFilters = f.output
	t.secretstoreFilters = f.secretstore
	t.GlobalFlags = g
	t.WindowFlags = w

	// Disable secret protection before performing any other operation
	if g.unprotected {
		log.Println("W! Running without secret protection!")
		config.DisableSecretProtection()
	}

	// Set global password
	if g.password != "" {
		config.Password = config.NewSecret([]byte(g.password))
	}

	// Set environment replacement behavior
	config.OldEnvVarReplacement = g.oldEnvBehavior

	config.PrintPluginConfigSource = g.printPluginConfigSource
}

func (t *Telegraf) ListSecretStores() ([]string, error) {
	c, err := t.loadConfiguration()
	if err != nil {
		return nil, err
	}

	ids := make([]string, 0, len(c.SecretStores))
	for k := range c.SecretStores {
		ids = append(ids, k)
	}
	return ids, nil
}

func (t *Telegraf) GetSecretStore(id string) (telegraf.SecretStore, error) {
	t.quiet = true
	c, err := t.loadConfiguration()
	if err != nil {
		return nil, err
	}

	store, found := c.SecretStores[id]
	if !found {
		return nil, errors.New("unknown secret store")
	}

	return store, nil
}

func (t *Telegraf) reloadLoop() error {
	reloadConfig := false
	reload := make(chan bool, 1)
	reload <- true
	for <-reload {
		reload <- false
		ctx, cancel := context.WithCancel(context.Background())

		signals := make(chan os.Signal, 1)
		signal.Notify(signals, os.Interrupt, syscall.SIGHUP,
			syscall.SIGTERM, syscall.SIGINT)
		if t.watchConfig != "" {
			for _, fConfig := range t.configFiles {
				if isURL(fConfig) {
					continue
				}

				if _, err := os.Stat(fConfig); err != nil {
					log.Printf("W! Cannot watch config %s: %s", fConfig, err)
				} else {
					go t.watchLocalConfig(ctx, signals, fConfig)
				}
			}
			for _, fConfigDirectory := range t.configDir {
				if _, err := os.Stat(fConfigDirectory); err != nil {
					log.Printf("W! Cannot watch config directory %s: %s", fConfigDirectory, err)
				} else {
					go t.watchLocalConfig(ctx, signals, fConfigDirectory)
				}
			}
		}
		if t.configURLWatchInterval > 0 {
			remoteConfigs := make([]string, 0)
			for _, fConfig := range t.configFiles {
				if isURL(fConfig) {
					remoteConfigs = append(remoteConfigs, fConfig)
				}
			}
			if len(remoteConfigs) > 0 {
				go t.watchRemoteConfigs(ctx, signals, t.configURLWatchInterval, remoteConfigs)
			}
		}
		go func() {
			select {
			case sig := <-signals:
				if sig == syscall.SIGHUP {
					log.Println("I! Reloading Telegraf config")
					// May need to update the list of known config files
					// if a delete or create occured. That way on the reload
					// we ensure we watch the correct files.
					if err := t.getConfigFiles(); err != nil {
						log.Println("E! Error loading config files: ", err)
					}
					<-reload
					reload <- true
				}
				cancel()
			case err := <-t.pprofErr:
				log.Printf("E! pprof server failed: %v", err)
				cancel()
			case <-stop:
				cancel()
			}
		}()

		err := t.runAgent(ctx, reloadConfig)
		if err != nil && !errors.Is(err, context.Canceled) {
			return fmt.Errorf("[telegraf] Error running agent: %w", err)
		}
		reloadConfig = true
	}

	return nil
}

func (t *Telegraf) watchLocalConfig(ctx context.Context, signals chan os.Signal, fConfig string) {
	var mytomb tomb.Tomb
	var watcher watch.FileWatcher
	if t.watchConfig == "poll" {
		if t.watchInterval > 0 {
			watcher = watch.NewPollingFileWatcherWithDuration(fConfig, t.watchInterval)
		} else {
			watcher = watch.NewPollingFileWatcher(fConfig)
		}
	} else {
		watcher = watch.NewInotifyFileWatcher(fConfig)
	}
	changes, err := watcher.ChangeEvents(&mytomb, 0)
	if err != nil {
		log.Printf("E! Error watching config file/directory %q: %s\n", fConfig, err)
		return
	}
	log.Printf("I! Config watcher started for %s\n", fConfig)
	select {
	case <-ctx.Done():
		mytomb.Done()
		return
	case <-changes.Modified:
		log.Printf("I! Config file/directory %q modified\n", fConfig)
	case <-changes.Deleted:
		// deleted can mean moved. wait a bit a check existence
		<-time.After(time.Second)
		if _, err := os.Stat(fConfig); err == nil {
			log.Printf("I! Config file/directory %q overwritten\n", fConfig)
		} else {
			log.Printf("W! Config file/directory %q deleted\n", fConfig)
		}
	case <-changes.Truncated:
		log.Printf("I! Config file/directory %q truncated\n", fConfig)
	case <-changes.Created:
		log.Printf("I! Config directory %q has new file(s)\n", fConfig)
	case <-mytomb.Dying():
		log.Printf("I! Config watcher %q ended\n", fConfig)
		return
	}
	mytomb.Done()
	signals <- syscall.SIGHUP
}

func (*Telegraf) watchRemoteConfigs(ctx context.Context, signals chan os.Signal, interval time.Duration, remoteConfigs []string) {
	configs := strings.Join(remoteConfigs, ", ")
	log.Printf("I! Remote config watcher started for: %s\n", configs)

	ticker := time.NewTicker(interval)
	defer ticker.Stop()

	lastModified := make(map[string]string, len(remoteConfigs))
	for {
		select {
		case <-ctx.Done():
			return
		case <-signals:
			return
		case <-ticker.C:
			for _, configURL := range remoteConfigs {
				resp, err := http.Head(configURL) //nolint:gosec // user provided URL
				if err != nil {
					log.Printf("W! Error fetching config URL, %s: %s\n", configURL, err)
					continue
				}
				resp.Body.Close()

				modified := resp.Header.Get("Last-Modified")
				if modified == "" {
					log.Printf("E! Last-Modified header not found, stopping the watcher for %s\n", configURL)
					delete(lastModified, configURL)
				}

				if lastModified[configURL] == "" {
					lastModified[configURL] = modified
				} else if lastModified[configURL] != modified {
					log.Printf("I! Remote config modified: %s\n", configURL)
					signals <- syscall.SIGHUP
					return
				}
			}
		}
	}
}

func (t *Telegraf) loadConfiguration() (*config.Config, error) {
	// If no other options are specified, load the config file and run.
	c := config.NewConfig()
	c.Agent.Quiet = t.quiet
	c.Agent.ConfigURLRetryAttempts = t.configURLRetryAttempts
	c.OutputFilters = t.outputFilters
	c.InputFilters = t.inputFilters
	c.SecretStoreFilters = t.secretstoreFilters

	if err := t.getConfigFiles(); err != nil {
		return c, err
	}
	if err := c.LoadAll(t.configFiles...); err != nil {
		return c, err
	}
	return c, nil
}

func (t *Telegraf) getConfigFiles() error {
	var configFiles []string

	configFiles = append(configFiles, t.config...)
	for _, fConfigDirectory := range t.configDir {
		files, err := config.WalkDirectory(fConfigDirectory)
		if err != nil {
			return err
		}
		configFiles = append(configFiles, files...)
	}

	// load default config paths if none are found
	if len(configFiles) == 0 {
		defaultFiles, err := config.GetDefaultConfigPath()
		if err != nil {
			return fmt.Errorf("unable to load default config paths: %w", err)
		}
		configFiles = append(configFiles, defaultFiles...)
	}

	t.configFiles = configFiles
	return nil
}

func (t *Telegraf) runAgent(ctx context.Context, reloadConfig bool) error {
	c := t.cfg
	var err error
	if reloadConfig {
		if c, err = t.loadConfiguration(); err != nil {
			return err
		}
	}

	if !(t.test || t.testWait != 0) && len(c.Outputs) == 0 {
		return errors.New("no outputs found, probably invalid config file provided")
	}
	if t.plugindDir == "" && len(c.Inputs) == 0 {
		return errors.New("no inputs found, probably invalid config file provided")
	}

	if int64(c.Agent.Interval) <= 0 {
		return fmt.Errorf("agent interval must be positive, found %v", c.Agent.Interval)
	}

	if int64(c.Agent.FlushInterval) <= 0 {
		return fmt.Errorf("agent flush_interval must be positive; found %v", c.Agent.Interval)
	}

	// Setup logging as configured.
	logConfig := &logger.Config{
		Debug:                   c.Agent.Debug || t.debug,
		Quiet:                   c.Agent.Quiet || t.quiet,
		LogTarget:               c.Agent.LogTarget,
		LogFormat:               c.Agent.LogFormat,
		Logfile:                 c.Agent.Logfile,
		StructuredLogMessageKey: c.Agent.StructuredLogMessageKey,
		RotationInterval:        time.Duration(c.Agent.LogfileRotationInterval),
		RotationMaxSize:         int64(c.Agent.LogfileRotationMaxSize),
		RotationMaxArchives:     c.Agent.LogfileRotationMaxArchives,
		LogWithTimezone:         c.Agent.LogWithTimezone,
	}

	if err := logger.SetupLogging(logConfig); err != nil {
		return err
	}

	log.Printf("I! Starting Telegraf %s%s brought to you by InfluxData the makers of InfluxDB", internal.Version, internal.Customized)
	log.Printf("I! Available plugins: %d inputs, %d aggregators, %d processors, %d parsers, %d outputs, %d secret-stores",
		len(inputs.Inputs),
		len(aggregators.Aggregators),
		len(processors.Processors),
		len(parsers.Parsers),
		len(outputs.Outputs),
		len(secretstores.SecretStores),
	)
	log.Printf("I! Loaded inputs: %s\n%s", strings.Join(c.InputNames(), " "), c.InputNamesWithSources())
	log.Printf("I! Loaded aggregators: %s\n%s", strings.Join(c.AggregatorNames(), " "), c.AggregatorNamesWithSources())
	log.Printf("I! Loaded processors: %s\n%s", strings.Join(c.ProcessorNames(), " "), c.ProcessorNamesWithSources())
	log.Printf("I! Loaded secretstores: %s\n%s", strings.Join(c.SecretstoreNames(), " "), c.SecretstoreNamesWithSources())
	if !t.once && (t.test || t.testWait != 0) {
		log.Print("W! " + color.RedString("Outputs are not used in testing mode!"))
	} else {
		log.Printf("I! Loaded outputs: %s\n%s", strings.Join(c.OutputNames(), " "), c.OutputNamesWithSources())
	}
	log.Printf("I! Tags enabled: %s", c.ListTags())

	if count, found := c.Deprecations["inputs"]; found && (count[0] > 0 || count[1] > 0) {
		log.Printf("W! Deprecated inputs: %d and %d options", count[0], count[1])
	}
	if count, found := c.Deprecations["aggregators"]; found && (count[0] > 0 || count[1] > 0) {
		log.Printf("W! Deprecated aggregators: %d and %d options", count[0], count[1])
	}
	if count, found := c.Deprecations["processors"]; found && (count[0] > 0 || count[1] > 0) {
		log.Printf("W! Deprecated processors: %d and %d options", count[0], count[1])
	}
	if count, found := c.Deprecations["outputs"]; found && (count[0] > 0 || count[1] > 0) {
		log.Printf("W! Deprecated outputs: %d and %d options", count[0], count[1])
	}
	if count, found := c.Deprecations["secretstores"]; found && (count[0] > 0 || count[1] > 0) {
		log.Printf("W! Deprecated secretstores: %d and %d options", count[0], count[1])
	}

	// Compute the amount of locked memory needed for the secrets
	if !t.GlobalFlags.unprotected {
		required := 3 * c.NumberSecrets * uint64(os.Getpagesize())
		available := getLockedMemoryLimit()
		if required > available {
			required /= 1024
			available /= 1024
			log.Printf("I! Found %d secrets...", c.NumberSecrets)
			msg := fmt.Sprintf("Insufficient lockable memory %dkb when %dkb is required.", available, required)
			msg += " Please increase the limit for Telegraf in your Operating System!"
			log.Print("W! " + color.RedString(msg))
		}
	}
	ag := agent.NewAgent(c)

	// Notify systemd that telegraf is ready
	// SdNotify() only tries to notify if the NOTIFY_SOCKET environment is set, so it's safe to call when systemd isn't present.
	// Ignore the return values here because they're not valid for platforms that don't use systemd.
	// For platforms that use systemd, telegraf doesn't log if the notification failed.
	//nolint:errcheck // see above
	daemon.SdNotify(false, daemon.SdNotifyReady)

	if t.once {
		wait := time.Duration(t.testWait) * time.Second
		return ag.Once(ctx, wait)
	}

	if t.test || t.testWait != 0 {
		wait := time.Duration(t.testWait) * time.Second
		return ag.Test(ctx, wait)
	}

	if t.pidFile != "" {
		f, err := os.OpenFile(t.pidFile, os.O_CREATE|os.O_WRONLY, 0640)
		if err != nil {
			log.Printf("E! Unable to create pidfile: %s", err)
		} else {
			fmt.Fprintf(f, "%d\n", os.Getpid())

			err = f.Close()
			if err != nil {
				return err
			}

			defer func() {
				err := os.Remove(t.pidFile)
				if err != nil {
					log.Printf("E! Unable to remove pidfile: %s", err)
				}
			}()
		}
	}

	return ag.Run(ctx)
}

// isURL checks if string is valid url
func isURL(str string) bool {
	u, err := url.Parse(str)
	return err == nil && u.Scheme != "" && u.Host != ""
}
